"""
Main script to provide some extra miscellaneous functions that are not part of any of the other mixins yet
"""
# pylint: disable=E1101
import sys
import psutil
import binarycpython
[docs]class miscellaneous_functions:
"""
Extension for the Population class containing some miscellaneous functions
"""
def __init__(self, **kwargs):
"""
Init function for the spacing_functions class
"""
return
[docs] def my_hostnames(self):
"""
Function to find the hostnames of the current filesystem
"""
if self.hostnameslist is None:
self.hostnameslist = binarycpython.utils.functions.hostnames()
return self.hostnameslist
[docs] def expand_args_by_hostname(self, cmdline_args):
"""
Expand a set of arguments by scanning each of them
for host-specific dicts.
Given each arg, either as a string "x=y" or dict {x:y},
determine whether y is a dict and if so does one of the keys
match the current hostname or "default", if so use the
corresponding value for the current machine.
"""
new_cmdline_args = None
# loop over list of cmdline args
if isinstance(cmdline_args, list):
new_cmdline_args = []
for cmdline_arg in cmdline_args:
new_arg = self._match_arg_to_host(arg=cmdline_arg)
new_cmdline_args.append(new_arg)
# loop over a dict of cmdline args
elif isinstance(cmdline_args, dict):
new_cmdline_args = {}
for parameter, value in cmdline_args.items():
new_arg = self._match_arg_to_host(arg={parameter: value})
new_cmdline_args[parameter] = new_arg
return new_cmdline_args
def _match_arg_to_host(self, arg=None, hostnameslist=None, vb=False):
"""
Given an arg, either as a string "x=y" or dict {x:y},
determine whether y is a dict and if so does one of the keys
match the current hostname or "default", if so use the
corresponding value. If not, return the original arg's value.
NOTE: (david): this function could probably be a normal function (hardly any self. calls)
"""
if arg is None:
return None
if hostnameslist is None:
hostnameslist = self.my_hostnames()
if isinstance(arg, dict):
# {parameter: value} dict arg
parameter = list(arg.keys())[0]
value = list(arg.values())[0]
argtype = "dict"
else:
# scalar x=y arg
split = arg.split("=")
if len(split) == 2:
parameter = split[0]
value = split[1]
argtype = "list"
else:
parameter = None
value = None
argtype = None
if parameter:
self.vb_debug(
f"_match_arg_to_host: {parameter} = {value} (argtype = {argtype})"
)
try:
self.vb_debug("Try to eval", value)
if isinstance(value, str):
# string : eval it
_x = eval(value)
else:
_x = value
# if dict, match to hostnames or 'default'
if isinstance(_x, dict):
_match = None
for host in hostnameslist:
self.vb_debug(f"check host={host}")
_match = _x.get(host, None)
if _match:
self.vb_debug(
f"Parameter {parameter} matches a value set by host={host} -> {_match}"
)
break
# no match? try 'default'
if not _match:
_match = _x.get("default", None)
# no match? error
if not _match:
self.vb_debug(
f"Parameter {parameter} is a dict {value} none of the keys or which matches our hostname ({hostnameslist}) or 'default'. Please update this parameter."
)
sys.exit(1)
else:
self.vb_debug(f"Parameter {parameter} set to value {_match}")
value = _match
except:
pass
if argtype == "list":
# return x=y type argument
new_arg = f"{parameter}={value}"
else:
new_arg = value
self.vb_debug("return arg {} of type {}".format(new_arg, type(value)))
return new_arg
def _set_nprocesses(self):
"""
Function to set the number of processes used in multiprocessing.
Test the value of population_options['num_cores']:
* If 0 or 'logical' or 'all logical', use all logical cores.
* If -1 or 'physical' or 'all physical', use all physical cores.
* If integer >=1 use the number of cores given.
* If a dict, (key,value) pairs are (hostname,num_cores)
for particular machines. We then try to match this machine to
the (hostname,num_cores) pairs to set num_cores. If not found,
default to either the dict "default" setting, or the number
of logical cores available.
* If population_options['num_cores'] is missing, all logical cores are used.
Exit if none of the above are satisfied.
"""
def __nprocs(ncores):
"""
Function to determine the number of processes
"""
if type(ncores) is str:
if ncores == "all" or ncores == "logical" or ncores == "all logical":
# use all logical cores available to us
return max(1, psutil.cpu_count(logical=True))
elif ncores == "all physical" or ncores == "physical":
# use all physical cores available to us
return max(1, psutil.cpu_count(logical=False))
else:
# unknown string : fall back to one CPU
self.vb_error(
f"grid_option['num_cores'] = \"{ncores}\" is not recognised.\nPlease consider either setting this option to a number, 'physical' or 'logical', or a dict of (host,num_cores) pairs."
)
self.exit(code=1)
if type(ncores) is int:
if ncores == -1:
return max(1, psutil.cpu_count(logical=False))
elif ncores == 0:
return max(1, psutil.cpu_count(logical=True))
else:
return ncores
else:
# ncores should be int or string, oops
self.vb_error(
f"grid_option['num_cores'] = \"{ncores}\" is not recognised.\nPlease consider either setting this option to a number, 'physical' or 'logical', or a dict of (host,num_cores) pairs."
)
self.exit(code=1)
self.vb_info(
f"NPROC: pre-eval \"{self.population_options['num_cores']}\" type \"{type(self.population_options['num_cores'])}\""
)
# eval the incoming string if possible, it may be a JSON
# representation of a dict, and if this works we
# set the num_cores option to the eval'd version.
# It may also be a normal string or int, so we can't just
# use the JSON parser.
try:
_x = eval(self.population_options["num_cores"])
self.population_options["num_cores"] = _x
except:
pass
# backwards compatibility
if "amt_cores" in self.population_options:
self.population_options["num_cores"] = self.population_options["amt_cores"]
self.vb_info(
f"NPROC: \"{self.population_options['num_cores']}\" type \"{type(self.population_options['num_cores'])}\""
)
if "num_cores" in self.population_options:
self.vb_info("NPROC found in grid options")
if type(self.population_options["num_cores"]) is dict:
# try to match hostname to the dict keys
hostnameslist = self.my_hostnames()
for host, ncores in self.population_options["num_cores"].items():
# check if we are this host
if host in hostnameslist:
self.population_options["_num_processes"] = __nprocs(ncores)
self.vb_info(
f"SET NPROC = {self.population_options['_num_processes']} FROM HOST {host}"
)
return
# no host match : use default number of cores
if "default" in self.population_options["num_cores"]:
self.population_options["_num_processes"] = __nprocs(
self.population_options["num_cores"]["default"]
)
self.vb_info(
f"SET NPROC = {self.population_options['_num_processes']} FROM default"
)
return
elif type(self.population_options["num_cores"]) is str:
# just use int passed in
self.population_options["_num_processes"] = __nprocs(
self.population_options["num_cores"]
)
self.vb_info(
f"SET NPROC = {self.population_options['_num_processes']} from string passed in"
)
return
elif type(self.population_options["num_cores"]) is int:
# just use int passed in
self.population_options["_num_processes"] = __nprocs(
self.population_options["num_cores"]
)
self.vb_info(
f"SET NPROC = {self.population_options['_num_processes']} from int passed in"
)
return
# no host matched or no 'num_cores' option provided: use all available logical cores
self.population_options["_num_processes"] = max(
1, psutil.cpu_count(logical=True)
)
self.vb_info(
f"SET NPROC = {self.population_options['_num_processes']} because no option given"
)
return